%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_coap_SUITE).

-compile(export_all).
-compile(nowarn_export_all).

-import(emqx_gateway_test_utils,
        [ request/2
        , request/3
        ]).

-include_lib("er_coap_client/include/coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

-define(CONF_DEFAULT, <<"
gateway.coap
{
    idle_timeout = 30s
    enable_stats = false
    mountpoint = \"\"
    notify_type = qos
    connection_required = true
    subscribe_qos = qos1
    publish_qos = qos1

    listeners.udp.default
    {bind = 5683}
}
">>).

-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-define(PS_PREFIX, "coap://127.0.0.1/ps").
-define(MQTT_PREFIX, "coap://127.0.0.1/mqtt").


all() -> emqx_common_test_helpers:all(?MODULE).

init_per_suite(Config) ->
    ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
    Config.

end_per_suite(_) ->
    {ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]),
    emqx_mgmt_api_test_util:end_suite([emqx_gateway]).

%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------

t_connection(_) ->
    Action = fun(Channel) ->
        %% connection
        Token = connection(Channel),

        timer:sleep(100),
        ?assertNotEqual(
           [],
           emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)),

        %% heartbeat
        HeartURI = ?MQTT_PREFIX ++
                   "/connection?clientid=client1&token=" ++
                   Token,

        ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
        {ok, changed, _} = er_coap_client:request(put, HeartURI),

        disconnection(Channel, Token),

        timer:sleep(100),
        ?assertEqual(
           [],
           emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>))
    end,
    do(Action).

t_publish(_) ->
    Action = fun(Channel, Token) ->
        Topic = <<"/abc">>,
        Payload = <<"123">>,

        TopicStr = binary_to_list(Topic),
        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,

        %% Sub topic first
        emqx:subscribe(Topic),

        Req = make_req(post, Payload),
        {ok, changed, _} = do_request(Channel, URI, Req),

        receive
            {deliver, Topic, Msg} ->
                ?assertEqual(Topic, Msg#message.topic),
                ?assertEqual(Payload, Msg#message.payload)
        after
            500 ->
                ?assert(false)
        end
    end,
    with_connection(Action).

%t_publish_authz_deny(_) ->
%    Action = fun(Channel, Token) ->
%        Topic = <<"/abc">>,
%        Payload = <<"123">>,
%        InvalidToken = lists:reverse(Token),
%
%        TopicStr = binary_to_list(Topic),
%        URI = ?PS_PREFIX ++
%              TopicStr ++
%              "?clientid=client1&token=" ++ InvalidToken,
%
%        %% Sub topic first
%        emqx:subscribe(Topic),
%
%        Req = make_req(post, Payload),
%        Result = do_request(Channel, URI, Req),
%        ?assertEqual({error, reset}, Result)
%    end,
%
%    with_connection(Action).

t_subscribe(_) ->
    Topic = <<"/abc">>,
    Fun = fun(Channel, Token) ->
        TopicStr = binary_to_list(Topic),
        Payload = <<"123">>,

        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
        Req = make_req(get, Payload, [{observe, 0}]),
        {ok, content, _} = do_request(Channel, URI, Req),
        ?LOGT("observer topic:~ts~n", [Topic]),

        timer:sleep(100),
        [SubPid] = emqx:subscribers(Topic),
        ?assert(is_pid(SubPid)),

        %% Publish a message
        emqx:publish(emqx_message:make(Topic, Payload)),
        {ok, content, Notify} = with_response(Channel),
        ?LOGT("observer get Notif=~p", [Notify]),

        #coap_content{payload = PayloadRecv} = Notify,

        ?assertEqual(Payload, PayloadRecv)
    end,

    with_connection(Fun),
    timer:sleep(100),

    ?assertEqual([], emqx:subscribers(Topic)).


t_un_subscribe(_) ->
    Topic = <<"/abc">>,
    Fun = fun(Channel, Token) ->
        TopicStr = binary_to_list(Topic),
        Payload = <<"123">>,

        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,

        Req = make_req(get, Payload, [{observe, 0}]),
        {ok, content, _} = do_request(Channel, URI, Req),
        ?LOGT("observer topic:~ts~n", [Topic]),

        timer:sleep(100),
        [SubPid] = emqx:subscribers(Topic),
        ?assert(is_pid(SubPid)),

        UnReq = make_req(get, Payload, [{observe, 1}]),
        {ok, nocontent, _} = do_request(Channel, URI, UnReq),
        ?LOGT("un observer topic:~ts~n", [Topic]),
        timer:sleep(100),
        ?assertEqual([], emqx:subscribers(Topic))
    end,

    with_connection(Fun).

t_observe_wildcard(_) ->
    Fun = fun(Channel, Token) ->
        %% resolve_url can't process wildcard with #
        Topic = <<"/abc/+">>,
        TopicStr = binary_to_list(Topic),
        Payload = <<"123">>,

        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
        Req = make_req(get, Payload, [{observe, 0}]),
        {ok, content, _} = do_request(Channel, URI, Req),
        ?LOGT("observer topic:~ts~n", [Topic]),

        timer:sleep(100),
        [SubPid] = emqx:subscribers(Topic),
        ?assert(is_pid(SubPid)),

        %% Publish a message
        PubTopic = <<"/abc/def">>,
        emqx:publish(emqx_message:make(PubTopic, Payload)),
        {ok, content, Notify} = with_response(Channel),

        ?LOGT("observer get Notif=~p", [Notify]),

        #coap_content{payload = PayloadRecv} = Notify,

        ?assertEqual(Payload, PayloadRecv)
    end,

    with_connection(Fun).

t_clients_api(_) ->
    Fun = fun(_Channel, _Token) ->
        ClientId = <<"client1">>,
        %% list
        {200, #{data := [Client1]}} = request(get, "/gateway/coap/clients"),
        #{clientid := ClientId} = Client1,
        %% searching
        {200, #{data := [Client2]}} =
            request(get, "/gateway/coap/clients",
                    [{<<"clientid">>, ClientId}]),
        {200, #{data := [Client3]}} =
            request(get, "/gateway/coap/clients",
                    [{<<"like_clientid">>, <<"cli">>}]),
        %% lookup
        {200, Client4} =
            request(get, "/gateway/coap/clients/client1"),
        %% assert
        Client1 = Client2 = Client3 = Client4,
        %% kickout
        {204, _} =
            request(delete, "/gateway/coap/clients/client1"),
        {200, #{data := []}} = request(get, "/gateway/coap/clients")
    end,
    with_connection(Fun).

t_clients_subscription_api(_) ->
    Fun = fun(_Channel, _Token) ->
        Path = "/gateway/coap/clients/client1/subscriptions",
        %% list
        {200, []} = request(get, Path),
        %% create
        SubReq = #{ topic => <<"tx">>
                  , qos => 0
                  , nl => 0
                  , rap => 0
                  , rh => 0
                  },

        {201, SubsResp} = request(post, Path, SubReq),
        {200, [SubsResp2]} = request(get, Path),
        ?assertEqual(
           maps:get(topic, SubsResp),
           maps:get(topic, SubsResp2)),

        {204, _} = request(delete, Path ++ "/tx"),

        {200, []} = request(get, Path)
    end,
    with_connection(Fun).

%%--------------------------------------------------------------------
%% helpers

connection(Channel) ->
    URI = ?MQTT_PREFIX ++
          "/connection?clientid=client1&username=admin&password=public",
    Req = make_req(post),
    {ok, created, Data} = do_request(Channel, URI, Req),
    #coap_content{payload = BinToken} = Data,
    binary_to_list(BinToken).

disconnection(Channel, Token) ->
    %% delete
    URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token,
    Req = make_req(delete),
    {ok, deleted, _} = do_request(Channel, URI, Req).

make_req(Method) ->
    make_req(Method, <<>>).

make_req(Method, Payload) ->
    make_req(Method, Payload, []).

make_req(Method, Payload, Opts) ->
    er_coap_message:request(con, Method, Payload, Opts).

do_request(Channel, URI, #coap_message{options = Opts} = Req) ->

    {_, _, Path, Query} = er_coap_client:resolve_uri(URI),
    Opts2 = [{uri_path, Path}, {uri_query, Query} | Opts],
    Req2 = Req#coap_message{options = Opts2},
    ?LOGT("send request:~ts~nReq:~p~n", [URI, Req2]),

    {ok, _} = er_coap_channel:send(Channel, Req2),
    with_response(Channel).

with_response(Channel) ->
    receive
        {coap_response, _ChId, Channel,
         _Ref, Message=#coap_message{method=Code}} ->
            return_response(Code, Message);
        {coap_error, _ChId, Channel, _Ref, reset} ->
            {error, reset}
    after 2000 ->
            {error, timeout}
    end.

return_response({ok, Code}, Message) ->
    {ok, Code, er_coap_message:get_content(Message)};
return_response({error, Code}, #coap_message{payload= <<>>}) ->
    {error, Code};
return_response({error, Code}, Message) ->
    {error, Code, er_coap_message:get_content(Message)}.

do(Fun) ->
    ChId = {{127, 0, 0, 1}, 5683},
    {ok, Sock} = er_coap_udp_socket:start_link(),
    {ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
    %% send and receive
    Res = Fun(Channel),
    %% terminate the processes
    er_coap_channel:close(Channel),
    er_coap_udp_socket:close(Sock),
    Res.

with_connection(Action) ->
    Fun = fun(Channel) ->
        Token = connection(Channel),
        timer:sleep(100),
        Action(Channel, Token),
        disconnection(Channel, Token),
        timer:sleep(100)
    end,
    do(Fun).
